跳到主要内容

Java 并发编程-并发包下的工具类

CountDownLatch 计数

它其实是作用于线程当中的,它就像一个门栓,利用它可以实现类似计数器的功能。比如有一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时就可以利用 CountDownLatch 来实现这种功能了。

要注意的是,它是一次性的,设置的计数用完就不能关上了。

怎么使用 CountDownLatch

看它的构造方法

// count 就是需要等待的线程数量
public CountDownLatch(int count)

然后一些常用的方法

// 调用此方法的线程会被阻塞,直到 CountDownLatch 的 count 为 0
public void await() throws InterruptedException

// 和上面的 await() 作用基本一致,只是可以设置一个最长等待时间(等待一定的时间后 count 值还没变为 0 的话就会继续执行)
public boolean await(long timeout, TimeUnit unit) throws InterruptedException

// 会将 count 减 1,直至为 0
public void countDown()

使用例

主线程需要等待其他线程的任务完成之后,才继续执行的代码。

public class Temp {

public static void main(String[] args) throws InterruptedException {
// 创建一个 CountDownLatch 设置计数器为 3(代表要等待三个线程)
CountDownLatch latch = new CountDownLatch(3);

for (int i = 0; i < 3; i++) {
new Thread(()-> {
System.out.println(Thread.currentThread().getName() + "线程正在执行");
try {
TimeUnit.SECONDS.sleep(2);
// 计数器减一
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

System.out.println("等待三个子线程执行");
latch.await();
System.out.println("三个子线程执行完毕");
}
}

CyclicBarrier 回环栅栏

通过它可以实现 让一组线程等待至某个状态之后再全部同时执行后面的状态。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。

暂且把这个状态就叫做 barrier,当调用 await() 方法之后,线程就处于 barrier 了。

CyclicBarrier 提供 2 个构造器:

// 参数 parties 指让多少个线程或者任务等待至 barrier 状态;
// 参数 barrierAction 为当这些线程都达到 barrier 状态时会执行的内容。
public CyclicBarrier(int parties, Runnable barrierAction)

public CyclicBarrier(int parties)

CyclicBarrier 中最重要的方法就是 await 方法,它有 2 个重载版本:

// 用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任务;
public int await()
// 和上面的 await() 作用基本一致,只是可以设置一个最长等待时间(等待一定的时间后 还没全变 barrier 的话就会继续执行)
public int await(long timeout, TimeUnit unit)

假若有若干个线程都要打印数据,并且只有所有线程都打印完数据之后,这些线程才能继续做后面的事情,此时就可以利用 CyclicBarrier 了:

public class Temp {

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);

for (int i = 0; i < 3; i++) {
new Thread(() -> {

System.out.println(Thread.currentThread().getName() + "线程正在执行");
try {

// 让每个线程随机等待 5-2 两秒,使得每个线程不是同时执行完成
TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 2);
System.out.println("线程 " + Thread.currentThread().getName() + " 写入数据完毕,等待其他线程写入完毕");

// 让这线程等待其它的线程执行完成
barrier.await();

System.out.println(Thread.currentThread().getName() + " 开始执行其它任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

这里再介绍一下它的第二个构造方法,它可以传入一个 Runnable 接口,它会在所有线程继续执行之前被调用

public class Temp {

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, ()-> {
System.out.println("==================所有线程写入完毕=================");
});

for (int i = 0; i < 3; i++) {
new Thread(() -> {

System.out.println(Thread.currentThread().getName() + "线程正在执行");
try {

// 让每个线程随机等待 5-2 两秒,使得每个线程不是同时执行完成
TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 2);
System.out.println("线程 " + Thread.currentThread().getName() + " 写入数据完毕,等待其他线程写入完毕");

// 让这线程等待其它的线程执行完成
barrier.await();

System.out.println(Thread.currentThread().getName() + " 开始执行其它任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

输出为:

Thread-1线程正在执行
Thread-0线程正在执行
Thread-2线程正在执行
线程 Thread-2 写入数据完毕,等待其他线程写入完毕
线程 Thread-0 写入数据完毕,等待其他线程写入完毕
线程 Thread-1 写入数据完毕,等待其他线程写入完毕
==================所有线程写入完毕=================
Thread-1 开始执行其它任务
Thread-0 开始执行其它任务
Thread-2 开始执行其它任务

Semaphore 信号量

参考资料 17、详解java线程同步工具Semaphore的使用

信号量(Semaphore)主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

Semaphore 用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。 一个线程获取许可证就调用 acquire 方法,用完了释放资源就调用 release 方法。

不过这样的解释实在有点抽象,现在用我自己的话来解释一下:

相信在学生时代都去餐厅打过饭,假如有3个窗口可以打饭,同一时刻也只能有3名同学打饭。第四个人来了之后就必须在外面等着,只要有打饭的同学好了,就可以去相应的窗口了。

比如说这张图,就全是了Semaphore的基本使用。认识一个知识点的最好方式就是直接去使用,我们干脆直接上代码来看看如何使用。

它常用的方法就下面这两个

// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
acquire() // acquire(获取)当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。


// 释放一个许可,将其返回给信号量。
release() // release(释放)实际上会将信号量的值加 1,然后唤醒等待的线程。

Semaphore 使用案例:3 个停车位,6 辆车去抢,走一辆,抢一个停车位。

public class Temp {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);

// 创建 6个线程
for (int i = 0; i < 6; i++) {

// 在线程里面 “抢车位”,每个线程都能抢到车位,但是顺序不同,后面的线程得等前面的线程离开后才能取得车位
new Thread(()-> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"号抢到车位");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"号离开");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放一个许可
semaphore.release();
}
}, "线程:\t " + i).start();
}
}
}

如上的例子所示,最多只有 3个线程同时取得 semaphore 其它线程得等其中一个线程执行 release() 方法才有机会去执行

Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限

Condition 监视器

生产者消费者问题都是使用的 synchronized 和 notifyAll()wait() 实现的,那使用 Lock 如何实现呢?关键就在于如何找到代替 wait、notifyAll 的工具?

这时就需要使用 Condition 工具类来实现 wait 和 notify 的功能

使用 Condition 时,引用的 Condition 对象必须从 Lock 实例的 newCondition() 返回,这样才能获得一个绑定了 Lock 实例的 Condition 实例。

Condition 提供的 await()signal()signalAll() 原理和 synchronized 锁对象的 wait()notify()notifyAll() 是一致的,并且其行为也是一样的:

  • await() 会释放当前锁,进入等待状态;
  • signal() 会唤醒某个等待线程;
  • signalAll() 会唤醒所有等待线程;
  • 唤醒线程从 await() 返回后需要重新获得锁。

tryLock() 类似,await() 可以在等待指定时间后,如果还没有被其他线程通过 signal()signalAll() 唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}

使用例

class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();

public void addTask(String s) {
lock.lock();
try {
queue.add(s);
// 通知全部 wait 方法
condition.signalAll();
} finally {
lock.unlock();
}
}

public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
// 释放锁,等价 wait
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}

LockSupport 线程的阻塞和唤醒 🔥

LockSupport 是构建同步组件的基础工具,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。

这个工具类的所有方法都是静态的,底层采用 UNSAFE 直接操作的内存,可以实现线程的阻塞和唤醒

public static void park() {
UNSAFE.park(false, 0L);
}

所以我们大概知道这么多就可以了,当然有一点非常重要,那就是 LockSupport 在进行线程阻塞和唤醒的时候是不需要获取锁的

作用:

  • park:阻塞一个线程
  • unpark:唤醒一个线程

为什么说是构建同步组件的基础工具呢,是因为 AQS 中的阻塞和唤醒就是基于 LockSupport 做的

而 ReentrantLock 中的 Sync 又是继承了 AQS 来完成的锁,所以说他是构建同步组件的基础工具

如何使用呢?

这里举一个面试题的例子:

编写一个程序,开启三个线程,这三个线程的 ID 分别是 A、B 和 C,每个线程把自己的 ID 在屏幕上打印 10 遍,要求输出结果必须按 ABC 的顺序显示,如 ABCABCABC... 依次递推

编写代码:

public class PrintABC {

static Thread threadA, threadB, threadC;

public static void main(String[] args) {
threadA = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 打印当前线程名称
System.out.print(Thread.currentThread().getName());
// 唤醒下一个线程
LockSupport.unpark(threadB);
// 当前线程阻塞
LockSupport.park();
}
}, "A");

threadB = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 先阻塞等待被唤醒
LockSupport.park();
System.out.print(Thread.currentThread().getName());
// 唤醒下一个线程
LockSupport.unpark(threadC);
}
}, "B");
threadC = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 先阻塞等待被唤醒
LockSupport.park();
System.out.print(Thread.currentThread().getName());
// 唤醒下一个线程
LockSupport.unpark(threadA);
}
}, "C");
threadA.start();
threadB.start();
threadC.start();
}
}

ScheduledThreadPoolExecutor 定时任务

参考资料 第二十章 计划任务

自JDK 1.5 开始,JDK提供了 ScheduledThreadPoolExecutor 类用于计划任务(又称定时任务),这个类有两个用途:

  • 在给定的延迟之后运行任务
  • 周期性重复执行任务

在这之前,是使用 Timer 类来完成定时任务的,但是Timer有缺陷:

  • Timer 是单线程模式;
  • 如果在执行任务期间某个 TimerTask 耗时较久,那么就会影响其它任务的调度;
  • Timer 的任务调度是基于绝对时间的,对系统时间敏感;
  • Timer 不会捕获执行 TimerTask 时所抛出的异常,由于 Timer 是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。

所以JDK 1.5之后,大家就摒弃 Timer 开始使用 ScheduledThreadPoolExecutor

假设我有一个需求,指定时间给大家发送消息。那么我们会将消息(包含发送时间)存储在数据库中,然后想用一个定时任务,每隔1秒检查数据库在当前时间有没有需要发送的消息(轮询),那这个计划任务怎么写?下面是一个Demo:

public class Temp {
private static final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());

private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
// 新建一个固定延迟时间的计划任务
executor.scheduleWithFixedDelay(() -> {
if (haveMsgAtCurrentTime()) {
System.out.println(df.format(new Date()));
System.out.println("大家注意了,我要发消息了");
}
}, 1, 1, TimeUnit.SECONDS);
}

public static boolean haveMsgAtCurrentTime() {
//查询数据库,有没有当前时间需要发送的消息
//这里省略实现,直接返回true
return true;
}
}
2021-02-03 22:45:39
大家注意了,我要发消息了
2021-02-03 22:45:40
大家注意了,我要发消息了
2021-02-03 22:45:41
大家注意了,我要发消息了
2021-02-03 22:45:42
大家注意了,我要发消息了
2021-02-03 22:45:43
大家注意了,我要发消息了
2021-02-03 22:45:44
大家注意了,我要发消息了
2021-02-03 22:45:45
大家注意了,我要发消息了